import ProcessFunctionScalaV2.DataJast
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector

/**
  * 实现：
  * 根据key分类，统计每个key进来的数据量，定期统计数量
  */
class MyProcessFunction extends KeyedProcessFunction[String, DataJast, DataJast] {

  val delayTime: Long = 1000L * 30

  lazy val valueState: ValueState[Long] = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("ccount", classOf[Long]))

  override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit = {
    if (valueState.value() == 0) {
      valueState.update(value.count)
      printf("运行task:%s,第一次初始化数量:%s\n", getRuntimeContext.getIndexOfThisSubtask, value.count)
      val currentTime: Long = ctx.timerService().currentProcessingTime()
      //注册定时器
      ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
    } else {
      valueState.update(valueState.value() + value.count)
      printf("运行task:%s,更新统计结果:%s\n", getRuntimeContext.getIndexOfThisSubtask, valueState.value())
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit = {
    //定时器执行，可加入业务操作
    printf("运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n", getRuntimeContext.getIndexOfThisSubtask, ctx.getCurrentKey, valueState.value())

    //定时统计完成，初始化统计数据
    valueState.update(0)
    //注册定时器
    val currentTime: Long = ctx.timerService().currentProcessingTime()
    ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
  }
}